Skip to content

Conversation

@vladshub
Copy link
Contributor

@vladshub vladshub commented Dec 25, 2019

A way to manage message de-duplication in grabbit.

Implementation details:

  • Each instance would set its own policy, it can be the default - None, reject duplicates - Reject and ack duplicates - Ack
  • Each service can set a duration to store each message in the duplicates table
  • Once a message is received from grabbit we check if the message-id is in our table
  • If it's in the table we invoke the policy
  • If it's not in the table we add the message, using our internal transaction, to be stored in the db
  • Once the processing is complete we and the internal tx is committed the message-id is also committed into the DB.
  • In the background we have an additional scheduler running which deletes messages that are older then what is set in the policy (currently hardcoded every minute to reduce the number of messages deleted in iteration to a minute)

This PR closes issue #33

@coveralls
Copy link

coveralls commented Dec 25, 2019

Coverage Status

Coverage decreased (-0.2%) to 76.13% when pulling c2d8b8c on feat-33 into 60007d8 on master.

@vladshub vladshub requested a review from a user December 26, 2019 06:47
@vladshub vladshub added the enhancement New feature or request label Dec 26, 2019
Copy link
Contributor

@rhinof rhinof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladshub wouldn't it be easier to de-dup when publishing from the transactional outbox rather than when consuming the message?

if err != nil {
worker.span.LogFields(slog.String("grabbit", "failed processing duplicate"))
worker.log().WithError(err).Error("failed checking for existing message")
return true, err
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why returning true if you don't know the message really is a duplicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't return nil for a bool value so I return true and an error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladshub I would return the default value of a bool in go which is false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhinof this way I'm more conservative since I'd rather have this message rejected as a duplicate and go to DLQ or through the error flow then to have processed it twice if I can't ensure deduplication.

return
}

isDuplicate, err := worker.handleDuplicates(bm, delivery, msgSpecificLogEntry)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldnt you want to "handle duplicates" in case of global handlers/ dead letter handlers invocation as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the deduplication is on the GbusMessage level in dlq/global we don't yet have the gbusMessage yet

}

//
func (d *deduper) StoreMessageID(tx *sql.Tx, id string) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you consider to store the message in another storage mechanism? like redis etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to introduce another infrastructure dependency but we can have different storages that would implement the same interface.

@vladshub
Copy link
Contributor Author

@vladshub wouldn't it be easier to de-dup when publishing from the transactional outbox rather than when consuming the message?

@rhinof we can but if we do that we can't guarantee that a service will receive only one since issues with the connection to rabbitmq might create additional duplications and also the time a message is in-flight might provide for additional duplications.


var _ deduplicator.Store = &deduper{}

type deduper struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking, the struct is named deduper which is the main struct here yet the file name is tx.go

gbus/worker.go Outdated
if worker.delicatePolicy == DeduplicationPolicyNone {
return false, nil
}
duplicate, err := worker.duplicateStore.MessageExists(message.IdempotencyKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we be passing in the active transaction ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For message exists there is no real need, we can and we pass in the StoreMessage but here it brings no to little value

@vladshub
Copy link
Contributor Author

@rhinof & @adiweiss can you please re-review?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants